---
description:
  A GraphQL subscription initiates an event stream, where each event in the stream is pushed from
  the server to the client that issued the subscription.
---

import { Callout } from '@theguild/components'

# Subscriptions

A GraphQL subscription initiates an event stream, where each event in the stream is pushed from the
server to the client that issued the subscription.

Example use cases for subscriptions are applications that require real-time data such as chat
applications or stock trading platforms.

GraphQL Yoga uses
[GraphQL over Server-Sent Events spec "distinct connections mode"](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode)
for subscriptions. You're recommended to use [`graphql-sse`](https://the-guild.dev/graphql/sse) on
the client.

<Callout>
  Compared to implementing query and mutation resolvers, subscriptions are more complex to implement
  as they require additional infrastructure for scenarios where you have more than one instance of
  your GraphQL server running as the event streams must be distributed across all servers.
</Callout>

## Quick Start with Server-Sent Events (SSE)

Subscriptions can be added by extending your GraphQL schema with a `Subscription` type.

```ts filename="Subscription countdown implementation"
import { createServer } from 'node:http'
import { setTimeout as setTimeout$ } from 'node:timers/promises'
import { createSchema, createYoga } from 'graphql-yoga'

// Provide your schema
const yoga = createYoga({
  schema: createSchema({
    typeDefs: /* GraphQL */ `
      type Query {
        hello: String
      }

      type Subscription {
        countdown(from: Int!): Int!
      }
    `,
    resolvers: {
      Query: {
        hello: () => 'world'
      },
      Subscription: {
        countdown: {
          // This will return the value on every 1 sec until it reaches 0
          subscribe: async function* (_, { from }) {
            for (let i = from; i >= 0; i--) {
              await setTimeout$(1000)
              yield { countdown: i }
            }
          }
        }
      }
    }
  })
})

const server = createServer(yoga)
server.listen(4000, () => {
  console.info('Server is running on http://localhost:4000/graphql')
})
```

Start the server, visit http://localhost:4000/graphql and paste the following operation into the
left panel.

```graphql filename="Subscription countdown operation"
subscription {
  countdown(from: 5)
}
```

Then press the Play (Execute Query) button.

Alternatively, you can also send the subscription operation via curl.

```bash
$ curl -N -H "accept:text/event-stream" http://localhost:4000/graphql?query=subscription%20%7B%0A%20%20countdown%28from%3A%205%29%0A%7D
event: next
data: {"data":{"countdown":5}}

event: next
data: {"data":{"countdown":4}}

event: next
data: {"data":{"countdown":3}}

event: next
data: {"data":{"countdown":2}}

event: next
data: {"data":{"countdown":1}}

event: next
data: {"data":{"countdown":0}}
```

{/* prettier-ignore */}
{/* TODO: use graphql-sse instead

<Callout>
  Watch [Episode #52 of
  `graphql.wtf`](https://graphql.wtf/episodes/52-graphql-subscriptions-with-server-sent-events) for
  a quick introduction to using GraphQL Subscriptions with Server Sent Events:
</Callout>

<iframe
  src="https://youtube.com/embed/oZtTut7QsZ8"
  title="GraphQL Subscriptions with Server Sent Events"
/>
*/}

## Handling Subscriptions on the Client

The [`graphql-sse` library](https://the-guild.dev/graphql/sse) should be used for GraphQL Yoga
subscriptions. Please advise the [`graphql-sse` recipes](https://the-guild.dev/graphql/sse/recipes)
for various examples of client integrations (including Apollo, Relay and Urql).

## GraphQL over Server-Sent Events Protocol (via `graphql-sse`)

There are two different modes in
[GraphQL over Server-Sent Events Protocol](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md).
You can see the differences in the
[protocol specification](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md).

### Distinct Connections Mode

GraphQL Yoga supports
[GraphQL over Server-Sent Events spec "distinct connections mode"](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode)
out of the box.

### Single Connection Mode

In case you want the subscriptions to be transported following the
[GraphQL over Server-Sent Events Protocol](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode)
also in the "single connection mode", you simply use the `@graphql-yoga/plugin-graphql-sse` plugin
for GraphQL Yoga that exposes an additional endpoint (defaulting to `/graphql/stream`) used for
[`graphql-sse`](https://the-guild.dev/graphql/sse) clients.

The plugin will hijack the request from the `onRequest` hook and will use **all** envelop plugins
provided.

```ts filename="yoga-with-graphql-sse.ts"
import { createServer } from 'node:http'
import { createYoga } from 'graphql-yoga'
import { useGraphQLSSE } from '@graphql-yoga/plugin-graphql-sse'

const yogaApp = createYoga({
  plugins: [
    // Simply install the graphql-sse plugin and you're off!
    useGraphQLSSE()
  ]
})

// Get NodeJS Server from Yoga
const httpServer = createServer(yogaApp)

httpServer.listen(4000, () => {
  console.log('Server is running on port 4000')
})
```

<Callout>
  Check out [our working
  example](https://github.com/graphql-hive/graphql-yoga/tree/main/examples/graphql-sse) for this
  integration.
</Callout>

### Client Integration

Please refer to the [`graphql-sse` client recipes](https://the-guild.dev/graphql/sse/recipes).

## GraphQL over WebSocket Protocol (via `graphql-ws`)

Suppose you want to use the
[`graphql-transport-ws`](https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md) protocol
with GraphQL Yoga, you can use the [`graphql-ws`](https://github.com/enisdenjo/graphql-ws) library.
To have the same execution pipeline in `graphql-ws`, we can use the **Envelop** instance from
GraphQL Yoga like below in Node.JS. Also, you can set `subscriptionsProtocol` in GraphiQL options to
use WebSockets instead of Server-Sent Events within GraphiQL.

```ts filename="yoga-with-ws.ts"
import { createServer } from 'node:http'
import { useServer } from 'graphql-ws/use/ws'
import { createYoga } from 'graphql-yoga'
import { WebSocketServer } from 'ws'

const yogaApp = createYoga({
  graphiql: {
    // Use WebSockets in GraphiQL
    subscriptionsProtocol: 'WS'
  }
})

// Get NodeJS Server from Yoga
const httpServer = createServer(yogaApp)
// Create WebSocket server instance from our Node server
const wsServer = new WebSocketServer({
  server: httpServer,
  path: yogaApp.graphqlEndpoint
})

// Integrate Yoga's Envelop instance and NodeJS server with graphql-ws
useServer(
  {
    execute: (args: any) => args.rootValue.execute(args),
    subscribe: (args: any) => args.rootValue.subscribe(args),
    onSubscribe: async (ctx, _id, params) => {
      const { schema, execute, subscribe, contextFactory, parse, validate } = yogaApp.getEnveloped({
        ...ctx,
        req: ctx.extra.request,
        socket: ctx.extra.socket,
        params
      })

      const args = {
        schema,
        operationName: params.operationName,
        document: parse(params.query),
        variableValues: params.variables,
        contextValue: await contextFactory(),
        rootValue: {
          execute,
          subscribe
        }
      }

      const errors = validate(args.schema, args.document)
      if (errors.length) return errors
      return args
    }
  },
  wsServer
)

httpServer.listen(4000, () => {
  console.log('Server is running on port 4000')
})
```

<Callout>
  Check out [our working
  example](https://github.com/graphql-hive/graphql-yoga/tree/main/examples/graphql-ws) for this
  integration.
</Callout>

### Client Integration

Please refer to the [`graphql-ws` client recipes](https://github.com/enisdenjo/graphql-ws#recipes).

## SSE vs WebSocket

#### Advantages of SSE over WebSockets

- Transported over simple HTTP instead of a custom protocol
- Built in support for re-connection and event-id Simpler protocol
- No trouble with corporate firewalls doing packet inspection

#### Advantages of WebSockets over SSE

- Real time, two directional communication
- Lower latency
- All operations multiplexed over the same connection, no browser limits

#### SSE Gotchas

- [Maximum open connections limit](https://developer.mozilla.org/en-US/docs/Web/HTTP/Connection_management_in_HTTP_1.x)
  ([higher (~100) when using http/2](https://developer.mozilla.org/en-US/docs/Glossary/HTTP_2)). In
  this case, consider using the
  [`graphql-sse` integration](#graphql-over-server-sent-events-protocol-via-graphql-sse) with
  ["single connection mode"](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode).

## PubSub

### Getting Started

GraphQL Yoga comes with a built-in PubSub (publish/subscribe) bus. This makes it easy to send new
events to the client from within your mutation resolvers.

```ts filename="server.ts"
import { createServer } from 'node:http'
import { createPubSub, createSchema, createYoga } from 'graphql-yoga'

const pubSub = createPubSub()

// Provide your schema
const yoga = createYoga({
  schema: createSchema({
    typeDefs: /* GraphQL */ `
      type Query {
        hello: String
      }

      type Subscription {
        randomNumber: Float!
      }

      type Mutation {
        broadcastRandomNumber: Boolean
      }
    `,
    resolvers: {
      Query: {
        hello: () => 'world'
      },
      Subscription: {
        randomNumber: {
          // subscribe to the randomNumber event
          subscribe: () => pubSub.subscribe('randomNumber'),
          resolve: payload => payload
        }
      },
      Mutation: {
        broadcastRandomNumber: (_, args) => {
          // publish a random number
          pubSub.publish('randomNumber', Math.random())
        }
      }
    }
  })
})

const server = createServer(yoga)
server.listen(4000, () => {
  console.info('Server is running on http://localhost:4000/graphql')
})
```

### Topics

When using TypeScript it is possible to make the event emitter type-safe by providing a channel
configuration via a generic.

```ts
const pubSub = createPubSub<{
  randomNumber: [randomNumber: number]
}>()

pubsub.subscribe('randomNumber')

// This is now type-safe.
pubSub.publish('randomNumber', 1)

// This causes a TypeScript error.
pubSub.publish('randomNumber')

// This causes a TypeScript error.
pubSub.publish('event does not exist')
```

You can subscribe to a specific topic using `pubSub.subscribe`.

```ts
const pubSub = createPubSub<{
  randomNumber: [randomNumber: number]
}>()

// Usage outside a GraphQL subscribe function
async function subscribe() {
  const eventSource = pubSub.subscribe('randomNumber')

  for await (const value of eventSource) {
    console.log(value)
    // dispose subscription after the first event has been published.
    eventSource.return()
  }
}

subscribe()

pubSub.publish('randomNumber', 3)
```

You can publish a value using `pubSub.publish`.

```ts
const pubSub = createPubSub<{
  randomNumber: [randomNumber: number]
}>()

pubSub.publish('randomNumber', 3)
```

#### Topic Configuration Variants

You can declare events with and without a payload.

```ts
const pubSub = createPubSub<{
  // event has no payload
  'event:without:payload': []
  // event has payload of type number
  'event:payload:number': [payload: number]
  // event has payload of type { foo: number }
  'event:payload:obj': [payload: { foo: number }]
}>()

pubSub.publish('event:without:payload')
pubSub.publish('event:payload:number', 12)
pubSub.publish('event:payload:obj', { foo: 1 })
```

#### Topic with Dynamic ID

Sometimes you only want to emit and listen for events for a specific entity (e.g. user or product).
You can declare topics scoped to a special identifier.

```ts
const pubSub = createPubSub<{
  'user:followerCount': [userId: string, payload: { followerCount: number }]
}>()

const userId1 = '420'
const userId2 = '69'

// the userId argument is enforced by the TypeScript compiler.
pubSub.subscribe('user:followerCount', userId1)
pubSub.subscribe('user:followerCount', userId2)

pubSub.publish('user:followerCount', userId1, { followerCount: 30 })
pubSub.publish('user:followerCount', userId2, { followerCount: 12 })
```

### Distributed Pub/Sub for Production

If you spin up multiple instances of your GraphQL server each server instance will have their own
in-memory pub/sub instance. An event triggered on the one server instance will not be distributed to
the other server instances, resulting in subscribers on the other server not receiving any updates.

The `createPubSub` function allows you to specify a custom
[`EventTarget`](https://developer.mozilla.org/en-US/docs/Web/API/EventTarget) implementation, which
can use an external datastore for distributing the events across all server replicas such as
[`Redis Pub/Sub`](https://redis.io/topics/pubsub) or [`Kafka`](https://kafka.apache.org/).

The minimal `EventTarget` implementation is described by the
[`TypedEventTarget` interface](https://github.com/graphql-hive/graphql-yoga/blob/master/packages/event-target/typed-event-target/src/index.ts#L18).

Yoga comes with an `EventTarget` implementation for
[Redis Pub/Sub](https://redis.io/docs/manual/pubsub/).

```sh npm2yarn
npm i @graphql-yoga/redis-event-target ioredis
```

```ts
import { createPubSub } from 'graphql-yoga'
import { Redis } from 'ioredis'
import { createRedisEventTarget } from '@graphql-yoga/redis-event-target'

const publishClient = new Redis()
const subscribeClient = new Redis()

const eventTarget = createRedisEventTarget({
  publishClient,
  subscribeClient
})

const pubSub = createPubSub({ eventTarget })
```

**Please note** that Redis Pub/Sub requires a stable long-running connection and thus is not a
suitable solution for serverless or edge function environments.

**Please also note** that the event payloads must be JSON serializable. If you want to send complex
data structures over the wire you can use tools such as
[`superjson`](https://github.com/blitz-js/superjson).

### Custom serializer

By default, messages will be serialized to Redis using the native `JSON` available in your
JavaScript environment. However, you can customize this by providing an alternative with the
`serializer` option that expects a `stringify` and a `parse` methods similar to the native `JSON`.
For example, you can install the [superjson](https://github.com/blitz-js/superjson) package and use
it in the Redis event target instead:

```ts
import SuperJSON from 'superjson'

const publishClient = new Redis()
const subscribeClient = new Redis()

const eventTarget = createRedisEventTarget({
  publishClient,
  subscribeClient,
  serializer: SuperJSON
})
```

You can also provide your own logic if you want:

```ts
const eventTarget = createRedisEventTarget({
  publishClient,
  subscribeClient,
  serializer: {
    stringify: data => 'some serialized data',
    parse: message => ({ some: 'deserialized data' })
  }
})
```

## Advanced

### Filter and Map Values

Sometimes it is useful to filter or map events for an individual subscription set up by a client
based on subscription field arguments. Yoga has a few utility functions that make this as simple as
possible.

```ts filename="Example PubSub event stream transform"
import { createPubSub, createServer, filter, map, pipe } from 'graphql-yoga'

const pubSub = createPubSub<{
  randomNumber: [randomNumber: number]
}>()

const source = pipe(
  pubSub.subscribe('randomNumber'),
  map(publishedNumber => publishedNumber * 2),
  filter(multipliedNumber => multipliedNumber < 10)
)

;(async () => {
  for await (const value of source) {
    console.log(value)
  }
})()

pubSub.publish('randomNumber', 1) // logs 2
pubSub.publish('randomNumber', 2) // logs 4
pubSub.publish('randomNumber', 5) // filtered out
pubSub.publish('randomNumber', 3) // logs 6
source.return()
```

```ts filename="Example Random number stream transform"
import { createServer } from 'node:http'
import { createPubSub, createSchema, createYoga } from 'graphql-yoga'

const pubSub = createPubSub()

const yoga = createYoga({
  schema: createSchema({
    typeDefs: /* GraphQL */ `
      type Query {
        hello: String
      }

      type Subscription {
        randomNumber(multiplyBy: Int!, lessThan: Float!): Float!
      }

      type Mutation {
        broadcastRandomNumber: Boolean
      }
    `,
    resolvers: {
      Query: {
        hello: () => 'world'
      },
      Subscription: {
        randomNumber: {
          // subscribe to the randomNumber event
          subscribe: (_, args) =>
            pipe(
              pubSub.subscribe('randomNumber'),
              map(publishedNumber => publishedNumber * args.multiplyBy),
              filter(multipliedNumber => multipliedNumber < args.lessThan)
            ),
          resolve: payload => payload
        }
      },
      Mutation: {
        broadcastRandomNumber: (_, args) => {
          // publish a random number
          pubSub.publish('randomNumber', Math.random())
        }
      }
    }
  })
})

const server = createServer(yoga)
server.listen(4000, () => {
  console.info('Server is running on http://localhost:4000/graphql')
})
```

### Subscriptions with Initial Value

GraphQL subscriptions are primarily designed to send a stream of events to the client. Sometimes it
is useful to send an initial value to a client as soon as the GraphQL subscription is set up.

An example for this would be a `Subscription.globalCounter` field that syncs a counter with all
clients by streaming the initial counter value to a client that sets up the subscription and then,
furthermore, streams the updated counter value to the clients every time it changes.

GraphQL subscriptions are implemented using
[Async Iteration](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/asyncIterator),
which in itself is a very complex topic with a lot of pitfalls that can cause memory leaks if not
treated with caution.

Yoga uses and re-exports [`Repeater.js`](https://repeater.js.org/) ("The missing constructor for
creating safe async iterators") for providing a friendly developer experience.

```ts
import { createServer } from 'node:http'
import { createPubSub, createSchema, createYoga, map, pipe, Repeater } from 'graphql-yoga'

let globalCounter = 0
const pubSub = createPubSub()

const yoga = createYoga({
  schema: createSchema({
    typeDefs: /* GraphQL */ `
      type Query {
        hello: String
      }

      type Subscription {
        globalCounter: Int!
      }

      type Mutation {
        incrementGlobalCounter: Int!
      }
    `,
    resolvers: {
      Query: {
        hello: () => 'world'
      },
      Subscription: {
        globalCounter: {
          // Merge initial value with source stream of new values
          subscribe: () =>
            pipe(
              Repeater.merge([
                // cause an initial event so the
                // globalCounter is streamed to the client
                // upon initiating the subscription
                undefined,
                // event stream for future updates
                pubSub.subscribe('globalCounter:change')
              ]),
              // map all stream values to the latest globalCounter
              map(() => globalCounter)
            ),
          resolve: payload => payload
        }
      },
      Mutation: {
        incrementGlobalCounter: () => {
          globalCounter = globalCounter + 1
          // publish a global counter increment event
          pubSub.publish('globalCounter:change')
          return globalCounter
        }
      }
    }
  })
})

const server = createServer(yoga)
server.listen(4000, () => {
  console.info('Server is running on http://localhost:4000/graphql')
})
```

### Listen to Multiple Pub/Sub Topics

Sometimes it is handy to subscribe to multiple PubSub topics instead of a single one.

```ts
import { createServer } from 'node:http'
import { createPubSub, createSchema, createYoga, map, pipe, Repeater } from 'graphql-yoga'

type User = {
  id: string
  login: string
}

let user: User | null = {
  id: '1',
  login: 'Laurin'
}

const pubSub = createPubSub<{
  userLoginChanged: []
  userDeleted: []
}>()

const yoga = createYoga({
  schema: createSchema({
    typeDefs: /* GraphQL */ `
      type Query {
        hello: String
      }

      type User {
        id: ID!
        login: String
      }

      type Subscription {
        user: User
      }

      type Mutation {
        deleteUser: Boolean
        updateUserLogin(newLogin: String!): Boolean
      }
    `,
    resolvers: {
      Query: {
        hello: () => 'world'
      },
      Subscription: {
        user: {
          // Merge initial value with source streams of new values
          subscribe: () =>
            pipe(
              Repeater.merge([
                pubSub.subscribe('userLoginChanged'),
                pubSub.subscribe('userDeleted')
              ]),
              // map all stream values to the latest user
              map(() => user)
            ),
          resolve: payload => payload
        }
      },
      Mutation: {
        deleteUser() {
          user = null
          pubSub.publish('userDeleted')
          return true
        },
        updateUserLogin(_, args) {
          if (!user) {
            return false
          }
          user.login = args.newLogin
          pubSub.publish('userLoginChanged')
          return true
        }
      }
    }
  })
})

const server = createServer(yoga)
server.listen(4000, () => {
  console.info('Server is running on http://localhost:4000/graphql')
})
```
